{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 生成用户日志\n",
    "\n",
    "本代码的目的是把用户分组，并在每个组中统计用户的行为日志，以方便后续的并行化处理。\n",
    "\n",
    "This code aims to group users into serveral groups, then statistic the user behaivors into each group. This process can simplify the operation in multi-processing  "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import multiprocessing as mp\n",
    "import time\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "\n",
    "\n",
    "\n",
    "def reduce_mem_usage(df):\n",
    "    \"\"\" iterate through all the columns of a dataframe and modify the data type\n",
    "        to reduce memory usage.        \n",
    "    \"\"\"\n",
    "    start_mem = df.memory_usage().sum() \n",
    "    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))\n",
    "    \n",
    "    for col in df.columns:\n",
    "        col_type = df[col].dtype\n",
    "        \n",
    "        if col_type != object:\n",
    "            c_min = df[col].min()\n",
    "            c_max = df[col].max()\n",
    "            if str(col_type)[:3] == 'int':\n",
    "                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:\n",
    "                    df[col] = df[col].astype(np.int8)\n",
    "                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:\n",
    "                    df[col] = df[col].astype(np.int16)\n",
    "                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:\n",
    "                    df[col] = df[col].astype(np.int32)\n",
    "                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:\n",
    "                    df[col] = df[col].astype(np.int64)  \n",
    "            else:\n",
    "                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:\n",
    "                    df[col] = df[col].astype(np.float16)\n",
    "                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:\n",
    "                    df[col] = df[col].astype(np.float32)\n",
    "                else:\n",
    "                    df[col] = df[col].astype(np.float64)\n",
    "        else:\n",
    "            df[col] = df[col].astype('category')\n",
    "\n",
    "    end_mem = df.memory_usage().sum() \n",
    "    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))\n",
    "    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))\n",
    "    \n",
    "    return df\n",
    "\n",
    "def generate_logs_for_each_group(matrix, q):\n",
    "    user_log = dict()\n",
    "    for row in matrix:\n",
    "        user_log.setdefault(row[0], [])\n",
    "        user_log[row[0]].append(row[1])\n",
    "    print('This batc is finished')\n",
    "    q.put(user_log)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "CPU_NUMS = 8"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# round2 train的路径\n",
    "path = '../ECommAI_EUIR_round2_train_20190816/'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Memory usage of dataframe is 2575214592.00 MB\n",
      "Memory usage after optimization is: 1046181196.00 MB\n",
      "Decreased by 59.4%\n",
      "This batc is finished\n",
      "This batc is finished\n",
      "This batc is finished\n",
      "This batc is finished\n",
      "This batc is finished\n",
      "This batc is finished\n",
      "This batc is finished\n",
      "This batc is finished\n",
      "Waiting for the son processing\n",
      "This batc is finished\n",
      "Over, the time cost is:6.490148067474365\n"
     ]
    }
   ],
   "source": [
    "data = reduce_mem_usage(pd.read_csv(path+'user_behavior.csv', header=None))\n",
    "user = pd.read_csv(path+'user.csv', header=None)\n",
    "item = pd.read_csv(path+'item.csv', header=None)\n",
    "\n",
    "data['day'] = data[3] // 86400\n",
    "data['hour'] = data[3] // 3600 % 24\n",
    "\n",
    "data = data.drop(3, axis=1)\n",
    "\n",
    "data.columns = ['userID','itemID','behavoir','day','hour']\n",
    "user.columns = ['userID', 'sex', 'age', 'ability']\n",
    "item.columns = ['itemID', 'category', 'shop', 'band']\n",
    "\n",
    "data = data.drop_duplicates(['userID','itemID'],keep=\"last\")\n",
    "data = data.sort_values(['day','hour'], ascending=True).reset_index(drop=True)\n",
    "\n",
    "users = list(set(user['userID']))\n",
    "\n",
    "user_groups = [users[i: i + len(users) // CPU_NUMS] for i in range(0, len(users), len(users) // CPU_NUMS)]\n",
    "\n",
    "q = mp.Queue()\n",
    "for groupID in range(len(user_groups)):\n",
    "    matrix = data[data['userID'].isin(user_groups[groupID])][['userID','itemID']].values\n",
    "    task = mp.Process(target=generate_logs_for_each_group, args=(matrix, q, ))\n",
    "    task.start()\n",
    "    \n",
    "start_time = time.time()\n",
    "print('Waiting for the son processing')\n",
    "while q.qsize() != len(user_groups):\n",
    "    pass\n",
    "end_time = time.time()\n",
    "print(\"Over, the time cost is:\"  + str(end_time - start_time))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "for i in range(len(user_groups)):\n",
    "    temp = q.get()\n",
    "    f = open('full_logs/userlogs_group' + str(i) + '.txt','w')\n",
    "    f.write(str(temp))\n",
    "    f.close()\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.1"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
